1   package org.apache.lucene.store;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.EOFException;
21  import java.io.IOException;
22  import java.nio.BufferUnderflowException;
23  import java.nio.ByteBuffer;
24  import java.util.Iterator;
25  
26  import org.apache.lucene.util.WeakIdentityMap;
27  
28  /**
29   * Base IndexInput implementation that uses an array
30   * of ByteBuffers to represent a file.
31   * <p>
32   * Because Java's ByteBuffer uses an int to address the
33   * values, it's necessary to access a file greater
34   * Integer.MAX_VALUE in size using multiple byte buffers.
35   * <p>
36   * For efficiency, this class requires that the buffers
37   * are a power-of-two (<code>chunkSizePower</code>).
38   */
39  abstract class ByteBufferIndexInput extends IndexInput implements RandomAccessInput {
40    protected final BufferCleaner cleaner;  
41    protected final long length;
42    protected final long chunkSizeMask;
43    protected final int chunkSizePower;
44    
45    protected ByteBuffer[] buffers;
46    protected int curBufIndex = -1;
47    protected ByteBuffer curBuf; // redundant for speed: buffers[curBufIndex]
48  
49    protected boolean isClone = false;
50    protected final WeakIdentityMap<ByteBufferIndexInput,Boolean> clones;
51    
52    public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, BufferCleaner cleaner, boolean trackClones) {
53      final WeakIdentityMap<ByteBufferIndexInput,Boolean> clones = trackClones ? WeakIdentityMap.<ByteBufferIndexInput,Boolean>newConcurrentHashMap() : null;
54      if (buffers.length == 1) {
55        return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, cleaner, clones);
56      } else {
57        return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, cleaner, clones);
58      }
59    }
60    
61    ByteBufferIndexInput(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, BufferCleaner cleaner, WeakIdentityMap<ByteBufferIndexInput,Boolean> clones) {
62      super(resourceDescription);
63      this.buffers = buffers;
64      this.length = length;
65      this.chunkSizePower = chunkSizePower;
66      this.chunkSizeMask = (1L << chunkSizePower) - 1L;
67      this.clones = clones;
68      this.cleaner = cleaner;
69      assert chunkSizePower >= 0 && chunkSizePower <= 30;   
70      assert (length >>> chunkSizePower) < Integer.MAX_VALUE;
71    }
72    
73    @Override
74    public final byte readByte() throws IOException {
75      try {
76        return curBuf.get();
77      } catch (BufferUnderflowException e) {
78        do {
79          curBufIndex++;
80          if (curBufIndex >= buffers.length) {
81            throw new EOFException("read past EOF: " + this);
82          }
83          curBuf = buffers[curBufIndex];
84          curBuf.position(0);
85        } while (!curBuf.hasRemaining());
86        return curBuf.get();
87      } catch (NullPointerException npe) {
88        throw new AlreadyClosedException("Already closed: " + this);
89      }
90    }
91  
92    @Override
93    public final void readBytes(byte[] b, int offset, int len) throws IOException {
94      try {
95        curBuf.get(b, offset, len);
96      } catch (BufferUnderflowException e) {
97        int curAvail = curBuf.remaining();
98        while (len > curAvail) {
99          curBuf.get(b, offset, curAvail);
100         len -= curAvail;
101         offset += curAvail;
102         curBufIndex++;
103         if (curBufIndex >= buffers.length) {
104           throw new EOFException("read past EOF: " + this);
105         }
106         curBuf = buffers[curBufIndex];
107         curBuf.position(0);
108         curAvail = curBuf.remaining();
109       }
110       curBuf.get(b, offset, len);
111     } catch (NullPointerException npe) {
112       throw new AlreadyClosedException("Already closed: " + this);
113     }
114   }
115 
116   @Override
117   public final short readShort() throws IOException {
118     try {
119       return curBuf.getShort();
120     } catch (BufferUnderflowException e) {
121       return super.readShort();
122     } catch (NullPointerException npe) {
123       throw new AlreadyClosedException("Already closed: " + this);
124     }
125   }
126 
127   @Override
128   public final int readInt() throws IOException {
129     try {
130       return curBuf.getInt();
131     } catch (BufferUnderflowException e) {
132       return super.readInt();
133     } catch (NullPointerException npe) {
134       throw new AlreadyClosedException("Already closed: " + this);
135     }
136   }
137 
138   @Override
139   public final long readLong() throws IOException {
140     try {
141       return curBuf.getLong();
142     } catch (BufferUnderflowException e) {
143       return super.readLong();
144     } catch (NullPointerException npe) {
145       throw new AlreadyClosedException("Already closed: " + this);
146     }
147   }
148   
149   @Override
150   public long getFilePointer() {
151     try {
152       return (((long) curBufIndex) << chunkSizePower) + curBuf.position();
153     } catch (NullPointerException npe) {
154       throw new AlreadyClosedException("Already closed: " + this);
155     }
156   }
157 
158   @Override
159   public void seek(long pos) throws IOException {
160     // we use >> here to preserve negative, so we will catch AIOOBE,
161     // in case pos + offset overflows.
162     final int bi = (int) (pos >> chunkSizePower);
163     try {
164       if (bi == curBufIndex) {
165         curBuf.position((int) (pos & chunkSizeMask));
166       } else {
167         final ByteBuffer b = buffers[bi];
168         b.position((int) (pos & chunkSizeMask));
169         // write values, on exception all is unchanged
170         this.curBufIndex = bi;
171         this.curBuf = b;
172       }
173     } catch (ArrayIndexOutOfBoundsException | IllegalArgumentException e) {
174       throw new EOFException("seek past EOF: " + this);
175     } catch (NullPointerException npe) {
176       throw new AlreadyClosedException("Already closed: " + this);
177     }
178   }
179   
180   @Override
181   public byte readByte(long pos) throws IOException {
182     try {
183       final int bi = (int) (pos >> chunkSizePower);
184       return buffers[bi].get((int) (pos & chunkSizeMask));
185     } catch (IndexOutOfBoundsException ioobe) {
186       throw new EOFException("seek past EOF: " + this);
187     } catch (NullPointerException npe) {
188       throw new AlreadyClosedException("Already closed: " + this);
189     }
190   }
191   
192   // used only by random access methods to handle reads across boundaries
193   private void setPos(long pos, int bi) throws IOException {
194     try {
195       final ByteBuffer b = buffers[bi];
196       b.position((int) (pos & chunkSizeMask));
197       this.curBufIndex = bi;
198       this.curBuf = b;
199     } catch (ArrayIndexOutOfBoundsException | IllegalArgumentException aioobe) {
200       throw new EOFException("seek past EOF: " + this);
201     } catch (NullPointerException npe) {
202       throw new AlreadyClosedException("Already closed: " + this);
203     }
204   }
205 
206   @Override
207   public short readShort(long pos) throws IOException {
208     final int bi = (int) (pos >> chunkSizePower);
209     try {
210       return buffers[bi].getShort((int) (pos & chunkSizeMask));
211     } catch (IndexOutOfBoundsException ioobe) {
212       // either it's a boundary, or read past EOF, fall back:
213       setPos(pos, bi);
214       return readShort();
215     } catch (NullPointerException npe) {
216       throw new AlreadyClosedException("Already closed: " + this);
217     }
218   }
219 
220   @Override
221   public int readInt(long pos) throws IOException {
222     final int bi = (int) (pos >> chunkSizePower);
223     try {
224       return buffers[bi].getInt((int) (pos & chunkSizeMask));
225     } catch (IndexOutOfBoundsException ioobe) {
226       // either it's a boundary, or read past EOF, fall back:
227       setPos(pos, bi);
228       return readInt();
229     } catch (NullPointerException npe) {
230       throw new AlreadyClosedException("Already closed: " + this);
231     }
232   }
233 
234   @Override
235   public long readLong(long pos) throws IOException {
236     final int bi = (int) (pos >> chunkSizePower);
237     try {
238       return buffers[bi].getLong((int) (pos & chunkSizeMask));
239     } catch (IndexOutOfBoundsException ioobe) {
240       // either it's a boundary, or read past EOF, fall back:
241       setPos(pos, bi);
242       return readLong();
243     } catch (NullPointerException npe) {
244       throw new AlreadyClosedException("Already closed: " + this);
245     }
246   }
247 
248   @Override
249   public final long length() {
250     return length;
251   }
252 
253   @Override
254   public final ByteBufferIndexInput clone() {
255     final ByteBufferIndexInput clone = buildSlice((String) null, 0L, this.length);
256     try {
257       clone.seek(getFilePointer());
258     } catch(IOException ioe) {
259       throw new AssertionError(ioe);
260     }
261     
262     return clone;
263   }
264   
265   /**
266    * Creates a slice of this index input, with the given description, offset, and length. The slice is seeked to the beginning.
267    */
268   @Override
269   public final ByteBufferIndexInput slice(String sliceDescription, long offset, long length) {    
270     if (offset < 0 || length < 0 || offset+length > this.length) {
271       throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset + ",length=" + length + ",fileLength="  + this.length + ": "  + this);
272     }
273     
274     return buildSlice(sliceDescription, offset, length);
275   }
276 
277   /** Builds the actual sliced IndexInput (may apply extra offset in subclasses). **/
278   protected ByteBufferIndexInput buildSlice(String sliceDescription, long offset, long length) {
279     if (buffers == null) {
280       throw new AlreadyClosedException("Already closed: " + this);
281     }
282 
283     final ByteBuffer newBuffers[] = buildSlice(buffers, offset, length);
284     final int ofs = (int) (offset & chunkSizeMask);
285     
286     final ByteBufferIndexInput clone = newCloneInstance(getFullSliceDescription(sliceDescription), newBuffers, ofs, length);
287     clone.isClone = true;
288 
289     // register the new clone in our clone list to clean it up on closing:
290     if (clones != null) {
291       this.clones.put(clone, Boolean.TRUE);
292     }
293     
294     return clone;
295   }
296 
297   /** Factory method that creates a suitable implementation of this class for the given ByteBuffers. */
298   @SuppressWarnings("resource")
299   protected ByteBufferIndexInput newCloneInstance(String newResourceDescription, ByteBuffer[] newBuffers, int offset, long length) {
300     if (newBuffers.length == 1) {
301       newBuffers[0].position(offset);
302       return new SingleBufferImpl(newResourceDescription, newBuffers[0].slice(), length, chunkSizePower, this.cleaner, this.clones);
303     } else {
304       return new MultiBufferImpl(newResourceDescription, newBuffers, offset, length, chunkSizePower, cleaner, clones);
305     }
306   }
307   
308   /** Returns a sliced view from a set of already-existing buffers: 
309    *  the last buffer's limit() will be correct, but
310    *  you must deal with offset separately (the first buffer will not be adjusted) */
311   private ByteBuffer[] buildSlice(ByteBuffer[] buffers, long offset, long length) {
312     final long sliceEnd = offset + length;
313     
314     final int startIndex = (int) (offset >>> chunkSizePower);
315     final int endIndex = (int) (sliceEnd >>> chunkSizePower);
316 
317     // we always allocate one more slice, the last one may be a 0 byte one
318     final ByteBuffer slices[] = new ByteBuffer[endIndex - startIndex + 1];
319     
320     for (int i = 0; i < slices.length; i++) {
321       slices[i] = buffers[startIndex + i].duplicate();
322     }
323 
324     // set the last buffer's limit for the sliced view.
325     slices[slices.length - 1].limit((int) (sliceEnd & chunkSizeMask));
326     
327     return slices;
328   }
329 
330   @Override
331   public final void close() throws IOException {
332     try {
333       if (buffers == null) return;
334       
335       // make local copy, then un-set early
336       final ByteBuffer[] bufs = buffers;
337       unsetBuffers();
338       if (clones != null) {
339         clones.remove(this);
340       }
341       
342       if (isClone) return;
343       
344       // for extra safety unset also all clones' buffers:
345       if (clones != null) {
346         for (Iterator<ByteBufferIndexInput> it = this.clones.keyIterator(); it.hasNext();) {
347           final ByteBufferIndexInput clone = it.next();
348           assert clone.isClone;
349           clone.unsetBuffers();
350         }
351         this.clones.clear();
352       }
353       
354       for (final ByteBuffer b : bufs) {
355         freeBuffer(b);
356       }
357     } finally {
358       unsetBuffers();
359     }
360   }
361   
362   /**
363    * Called to remove all references to byte buffers, so we can throw AlreadyClosed on NPE.
364    */
365   private void unsetBuffers() {
366     buffers = null;
367     curBuf = null;
368     curBufIndex = 0;
369   }
370 
371   /**
372    * Called when the contents of a buffer will be no longer needed.
373    */
374   private void freeBuffer(ByteBuffer b) throws IOException {
375     if (cleaner != null) {
376       cleaner.freeBuffer(this, b);
377     }
378   }
379   
380   /**
381    * Pass in an implementation of this interface to cleanup ByteBuffers.
382    * MMapDirectory implements this to allow unmapping of bytebuffers with private Java APIs.
383    */
384   static interface BufferCleaner {
385     void freeBuffer(ByteBufferIndexInput parent, ByteBuffer b) throws IOException;
386   }
387   
388   /** Optimization of ByteBufferIndexInput for when there is only one buffer */
389   static final class SingleBufferImpl extends ByteBufferIndexInput {
390 
391     SingleBufferImpl(String resourceDescription, ByteBuffer buffer, long length, int chunkSizePower,
392         BufferCleaner cleaner, WeakIdentityMap<ByteBufferIndexInput,Boolean> clones) {
393       super(resourceDescription, new ByteBuffer[] { buffer }, length, chunkSizePower, cleaner, clones);
394       this.curBufIndex = 0;
395       this.curBuf = buffer;
396       buffer.position(0);
397     }
398     
399     // TODO: investigate optimizing readByte() & Co?
400     
401     @Override
402     public void seek(long pos) throws IOException {
403       try {
404         curBuf.position((int) pos);
405       } catch (IllegalArgumentException e) {
406         if (pos < 0) {
407           throw new IllegalArgumentException("Seeking to negative position: " + this, e);
408         } else {
409           throw new EOFException("seek past EOF: " + this);
410         }
411       } catch (NullPointerException npe) {
412         throw new AlreadyClosedException("Already closed: " + this);
413       }
414     }
415     
416     @Override
417     public long getFilePointer() {
418       try {
419         return curBuf.position();
420       } catch (NullPointerException npe) {
421         throw new AlreadyClosedException("Already closed: " + this);
422       }
423     }
424 
425     @Override
426     public byte readByte(long pos) throws IOException {
427       try {
428         return curBuf.get((int) pos);
429       } catch (IllegalArgumentException e) {
430         if (pos < 0) {
431           throw new IllegalArgumentException("Seeking to negative position: " + this, e);
432         } else {
433           throw new EOFException("seek past EOF: " + this);
434         }
435       } catch (NullPointerException npe) {
436         throw new AlreadyClosedException("Already closed: " + this);
437       }
438     }
439 
440     @Override
441     public short readShort(long pos) throws IOException {
442       try {
443         return curBuf.getShort((int) pos);
444       } catch (IllegalArgumentException e) {
445         if (pos < 0) {
446           throw new IllegalArgumentException("Seeking to negative position: " + this, e);
447         } else {
448           throw new EOFException("seek past EOF: " + this);
449         }
450       } catch (NullPointerException npe) {
451         throw new AlreadyClosedException("Already closed: " + this);
452       }
453     }
454 
455     @Override
456     public int readInt(long pos) throws IOException {
457       try {
458         return curBuf.getInt((int) pos);
459       } catch (IllegalArgumentException e) {
460         if (pos < 0) {
461           throw new IllegalArgumentException("Seeking to negative position: " + this, e);
462         } else {
463           throw new EOFException("seek past EOF: " + this);
464         }
465       } catch (NullPointerException npe) {
466         throw new AlreadyClosedException("Already closed: " + this);
467       }
468     }
469 
470     @Override
471     public long readLong(long pos) throws IOException {
472       try {
473         return curBuf.getLong((int) pos);
474       } catch (IllegalArgumentException e) {
475         if (pos < 0) {
476           throw new IllegalArgumentException("Seeking to negative position: " + this, e);
477         } else {
478           throw new EOFException("seek past EOF: " + this);
479         }
480       } catch (NullPointerException npe) {
481         throw new AlreadyClosedException("Already closed: " + this);
482       }
483     }
484   }
485   
486   /** This class adds offset support to ByteBufferIndexInput, which is needed for slices. */
487   static final class MultiBufferImpl extends ByteBufferIndexInput {
488     private final int offset;
489     
490     MultiBufferImpl(String resourceDescription, ByteBuffer[] buffers, int offset, long length, int chunkSizePower,
491         BufferCleaner cleaner, WeakIdentityMap<ByteBufferIndexInput,Boolean> clones) {
492       super(resourceDescription, buffers, length, chunkSizePower, cleaner, clones);
493       this.offset = offset;
494       try {
495         seek(0L);
496       } catch (IOException ioe) {
497         throw new AssertionError(ioe);
498       }
499     }
500     
501     @Override
502     public void seek(long pos) throws IOException {
503       assert pos >= 0L;
504       super.seek(pos + offset);
505     }
506     
507     @Override
508     public long getFilePointer() {
509       return super.getFilePointer() - offset;
510     }
511     
512     @Override
513     public byte readByte(long pos) throws IOException {
514       return super.readByte(pos + offset);
515     }
516 
517     @Override
518     public short readShort(long pos) throws IOException {
519       return super.readShort(pos + offset);
520     }
521 
522     @Override
523     public int readInt(long pos) throws IOException {
524       return super.readInt(pos + offset);
525     }
526 
527     @Override
528     public long readLong(long pos) throws IOException {
529       return super.readLong(pos + offset);
530     }
531 
532     @Override
533     protected ByteBufferIndexInput buildSlice(String sliceDescription, long ofs, long length) {
534       return super.buildSlice(sliceDescription, this.offset + ofs, length);
535     }
536   }
537 }